home *** CD-ROM | disk | FTP | other *** search
/ Mac Easy 2010 May / Mac Life Ubuntu.iso / casper / filesystem.squashfs / usr / lib / python2.6 / dist-packages / apport / fileutils.pyc (.txt) < prev    next >
Encoding:
Python Compiled Bytecode  |  2009-10-12  |  17.1 KB  |  477 lines

  1. # Source Generated with Decompyle++
  2. # File: in.pyc (Python 2.6)
  3.  
  4. '''Functions to manage apport problem report files.
  5.  
  6. Copyright (C) 2006 Canonical Ltd.
  7. Author: Martin Pitt <martin.pitt@ubuntu.com>
  8.  
  9. This program is free software; you can redistribute it and/or modify it
  10. under the terms of the GNU General Public License as published by the
  11. Free Software Foundation; either version 2 of the License, or (at your
  12. option) any later version.  See http://www.gnu.org/copyleft/gpl.html for
  13. the full text of the license.
  14. '''
  15. import os
  16. import glob
  17. import subprocess
  18. import os.path as os
  19. import ConfigParser
  20. from problem_report import ProblemReport
  21. from packaging_impl import impl as packaging
  22. report_dir = os.environ.get('APPORT_REPORT_DIR', '/var/crash')
  23. _config_file = '~/.config/apport/settings'
  24.  
  25. def find_package_desktopfile(package):
  26.     '''If given package is installed and has a single .desktop file, return the
  27.     path to it, otherwise return None.'''
  28.     if package is None:
  29.         return None
  30.     desktopfile = None
  31.     for line in packaging.get_files(package):
  32.         if line.endswith('.desktop'):
  33.             if desktopfile:
  34.                 return None
  35.             desktopfile = line
  36.             continue
  37.         desktopfile
  38.     
  39.     return desktopfile
  40.  
  41.  
  42. def likely_packaged(file):
  43.     '''Check whether the given file is likely to belong to a package.
  44.  
  45.     This is semi-decidable: A return value of False is definitive, a True value
  46.     is only a guess which needs to be checked with find_file_package().
  47.     However, this function is very fast and does not access the package
  48.     database.'''
  49.     pkg_whitelist = [
  50.         '/bin/',
  51.         '/boot',
  52.         '/etc/',
  53.         '/initrd',
  54.         '/lib',
  55.         '/sbin/',
  56.         '/usr/',
  57.         '/var']
  58.     whitelist_match = False
  59.     for i in pkg_whitelist:
  60.         if file.startswith(i):
  61.             whitelist_match = True
  62.             break
  63.             continue
  64.     
  65.     if whitelist_match and not file.startswith('/usr/local/'):
  66.         pass
  67.     return not file.startswith('/var/lib/schroot')
  68.  
  69.  
  70. def find_file_package(file):
  71.     '''Return the package that ships the given file (or None if no package
  72.     ships it).'''
  73.     (dir, name) = os.path.split(file)
  74.     resolved_dir = os.path.realpath(dir)
  75.     if os.path.isdir(resolved_dir):
  76.         file = os.path.join(resolved_dir, name)
  77.     
  78.     if not likely_packaged(file):
  79.         return None
  80.     return packaging.get_file_package(file)
  81.  
  82.  
  83. def seen_report(report):
  84.     '''Check whether the given report file has already been processed
  85.     earlier.'''
  86.     st = os.stat(report)
  87.     if not st.st_atime > st.st_mtime:
  88.         pass
  89.     return st.st_size == 0
  90.  
  91.  
  92. def mark_report_seen(report):
  93.     '''Mark given report file as seen.'''
  94.     st = os.stat(report)
  95.     
  96.     try:
  97.         os.utime(report, (st.st_mtime, st.st_mtime - 1))
  98.     except OSError:
  99.         timeout = 12
  100.         while timeout > 0:
  101.             f = open(report)
  102.             f.read(1)
  103.             f.close()
  104.             
  105.             try:
  106.                 st = os.stat(report)
  107.             except OSError:
  108.                 return None
  109.  
  110.             if st.st_atime > st.st_mtime:
  111.                 break
  112.             
  113.             time.sleep(0.1)
  114.             timeout -= 1
  115.         if timeout == 0:
  116.             delete_report(report)
  117.         
  118.     except:
  119.         timeout == 0
  120.  
  121.  
  122.  
  123. def get_all_reports():
  124.     '''Return a list with all report files which are accessible to the calling
  125.     user.'''
  126.     reports = []
  127.     for r in glob.glob(os.path.join(report_dir, '*.crash')):
  128.         
  129.         try:
  130.             if os.path.getsize(r) > 0 and os.access(r, os.R_OK):
  131.                 reports.append(r)
  132.         continue
  133.         except OSError:
  134.             continue
  135.         
  136.  
  137.     
  138.     return reports
  139.  
  140.  
  141. def get_new_reports():
  142.     '''Return a list with all report files which have not yet been processed
  143.     and are accessible to the calling user.'''
  144.     return _[1]
  145.  
  146.  
  147. def get_all_system_reports():
  148.     '''Return a list with all report files which belong to a system user (i. e.
  149.     uid < 500 according to LSB).'''
  150.     reports = []
  151.     for r in glob.glob(os.path.join(report_dir, '*.crash')):
  152.         
  153.         try:
  154.             if os.path.getsize(r) > 0 and os.stat(r).st_uid < 500:
  155.                 reports.append(r)
  156.         continue
  157.         except OSError:
  158.             continue
  159.         
  160.  
  161.     
  162.     return reports
  163.  
  164.  
  165. def get_new_system_reports():
  166.     '''Return a list with all report files which have not yet been processed
  167.     and belong to a system user (i. e. uid < 500 according to LSB).'''
  168.     return _[1]
  169.  
  170.  
  171. def delete_report(report):
  172.     '''Delete the given report file.
  173.  
  174.     If unlinking the file fails due to a permission error (if report_dir is not
  175.     writable to normal users), the file will be truncated to 0 bytes instead.'''
  176.     
  177.     try:
  178.         os.unlink(report)
  179.     except OSError:
  180.         open(report, 'w').truncate(0)
  181.  
  182.  
  183.  
  184. def get_recent_crashes(report):
  185.     '''Return the number of recent crashes for the given report file.
  186.  
  187.     Return the number of recent crashes (currently, crashes which happened more
  188.     than 24 hours ago are discarded).'''
  189.     pr = ProblemReport()
  190.     pr.load(report, False)
  191.     
  192.     try:
  193.         count = int(pr['CrashCounter'])
  194.         report_time = time.mktime(time.strptime(pr['Date']))
  195.         cur_time = time.mktime(time.localtime())
  196.         if cur_time - report_time > 86400:
  197.             return 0
  198.         return count
  199.     except (ValueError, KeyError):
  200.         return 0
  201.  
  202.  
  203.  
  204. def make_report_path(report, uid = None):
  205.     '''Construct a canonical pathname for the given report.
  206.  
  207.     If uid is not given, it defaults to the uid of the current process.'''
  208.     if report.has_key('ExecutablePath'):
  209.         subject = report['ExecutablePath'].replace('/', '_')
  210.     elif report.has_key('Package'):
  211.         subject = report['Package'].split(None, 1)[0]
  212.     else:
  213.         raise ValueError, 'report has neither ExecutablePath nor Package attribute'
  214.     if not report.has_key('ExecutablePath'):
  215.         uid = os.getuid()
  216.     
  217.     return os.path.join(report_dir, '%s.%i.crash' % (subject, uid))
  218.  
  219.  
  220. def check_files_md5(sumfile):
  221.     """Given a list of MD5 sums in md5sum(1) format (relative to /), check
  222.     integrity of all files and return a list of files that don't match."""
  223.     if not os.path.exists(sumfile):
  224.         raise AssertionError
  225.     m = subprocess.Popen([
  226.         '/usr/bin/md5sum',
  227.         '-c',
  228.         sumfile], stdout = subprocess.PIPE, stderr = subprocess.PIPE, close_fds = True, cwd = '/', env = { })
  229.     out = m.communicate()[0]
  230.     if m.returncode == 0:
  231.         return []
  232.     mismatches = []
  233.     for l in out.splitlines():
  234.         if l.endswith('FAILED'):
  235.             mismatches.append(l.rsplit(':', 1)[0])
  236.             continue
  237.         m.returncode == 0
  238.     
  239.     return mismatches
  240.  
  241.  
  242. def get_config(section, setting, default = None, bool = False):
  243.     '''Return a setting from user configuration.
  244.  
  245.     This is read from ~/.config/apport/settings. If bool is True, the value is
  246.     interpreted as a boolean.
  247.     '''
  248.     if not get_config.config:
  249.         get_config.config = ConfigParser.ConfigParser()
  250.         get_config.config.read(os.path.expanduser(_config_file))
  251.     
  252.     
  253.     try:
  254.         if bool:
  255.             return get_config.config.getboolean(section, setting)
  256.         return get_config.config.get(section, setting)
  257.     except (ConfigParser.NoOptionError, ConfigParser.NoSectionError):
  258.         return default
  259.  
  260.  
  261. get_config.config = None
  262. import unittest
  263. import tempfile
  264. import os
  265. import shutil
  266. import sys
  267. import time
  268. from cStringIO import StringIO
  269.  
  270. class _ApportUtilsTest(unittest.TestCase):
  271.     
  272.     def setUp(self):
  273.         global report_dir
  274.         self.orig_report_dir = report_dir
  275.         report_dir = tempfile.mkdtemp()
  276.         self.orig_config_file = _config_file
  277.  
  278.     
  279.     def tearDown(self):
  280.         global report_dir, _config_file
  281.         shutil.rmtree(report_dir)
  282.         report_dir = self.orig_report_dir
  283.         self.orig_report_dir = None
  284.         _config_file = self.orig_config_file
  285.  
  286.     
  287.     def _create_reports(self, create_inaccessible = False):
  288.         '''Create some test reports.'''
  289.         r1 = os.path.join(report_dir, 'rep1.crash')
  290.         r2 = os.path.join(report_dir, 'rep2.crash')
  291.         open(r1, 'w').write('report 1')
  292.         open(r2, 'w').write('report 2')
  293.         os.chmod(r1, 384)
  294.         os.chmod(r2, 384)
  295.         if create_inaccessible:
  296.             ri = os.path.join(report_dir, 'inaccessible.crash')
  297.             open(ri, 'w').write('inaccessible')
  298.             os.chmod(ri, 0)
  299.             return [
  300.                 r1,
  301.                 r2,
  302.                 ri]
  303.         return [
  304.             r1,
  305.             r2]
  306.  
  307.     
  308.     def test_find_package_desktopfile(self):
  309.         '''find_package_desktopfile().'''
  310.         nodesktop = 'bash'
  311.         if not [](_[1]) == 0:
  312.             raise AssertionError
  313.         onedesktop = None
  314.         multidesktop = None
  315.         for d in os.listdir('/usr/share/applications/'):
  316.             pkg = packaging.get_file_package(os.path.join('/usr/share/applications/', d))
  317.             num = [](_[2])
  318.             if not onedesktop and num == 1:
  319.                 onedesktop = pkg
  320.             elif not multidesktop and num > 1:
  321.                 multidesktop = pkg
  322.             
  323.             if onedesktop and multidesktop:
  324.                 break
  325.                 continue
  326.             len if not d.endswith('.desktop') else []
  327.         
  328.         if nodesktop:
  329.             self.assertEqual(find_package_desktopfile(nodesktop), None, 'no-desktop package %s' % nodesktop)
  330.         
  331.         if multidesktop:
  332.             self.assertEqual(find_package_desktopfile(multidesktop), None, 'multi-desktop package %s' % multidesktop)
  333.         
  334.         if onedesktop:
  335.             d = find_package_desktopfile(onedesktop)
  336.             self.assertNotEqual(d, None, 'one-desktop package %s' % onedesktop)
  337.             self.assert_(os.path.exists(d))
  338.             self.assert_(d.endswith('.desktop'))
  339.         
  340.  
  341.     
  342.     def test_likely_packaged(self):
  343.         '''likely_packaged().'''
  344.         self.assertEqual(likely_packaged('/bin/bash'), True)
  345.         self.assertEqual(likely_packaged('/usr/bin/foo'), True)
  346.         self.assertEqual(likely_packaged('/usr/local/bin/foo'), False)
  347.         self.assertEqual(likely_packaged('/home/test/bin/foo'), False)
  348.         self.assertEqual(likely_packaged('/tmp/foo'), False)
  349.         self.assertEqual(likely_packaged('/var/lib/foo'), True)
  350.         self.assertEqual(likely_packaged('/var/lib/schroot/bin/bash'), False)
  351.  
  352.     
  353.     def test_find_file_package(self):
  354.         '''find_file_package().'''
  355.         self.assertEqual(find_file_package('/bin/bash'), 'bash')
  356.         self.assertEqual(find_file_package('/bin/cat'), 'coreutils')
  357.         self.assertEqual(find_file_package('/nonexisting'), None)
  358.  
  359.     
  360.     def test_seen(self):
  361.         '''get_new_reports() and seen_report().'''
  362.         self.assertEqual(get_new_reports(), [])
  363.         self.assertEqual(set(get_new_reports()), set(tr))
  364.         nr = set(tr)
  365.         for r in tr:
  366.             self.assertEqual(seen_report(r), False)
  367.             nr.remove(r)
  368.             mark_report_seen(r)
  369.             self.assertEqual(seen_report(r), True)
  370.             self.assertEqual(set(get_new_reports()), nr)
  371.         
  372.  
  373.     
  374.     def test_get_all_reports(self):
  375.         '''get_all_reports().'''
  376.         self.assertEqual(get_all_reports(), [])
  377.         self.assertEqual(set(get_all_reports()), set(tr))
  378.         for r in tr:
  379.             mark_report_seen(r)
  380.         
  381.         self.assertEqual(set(get_all_reports()), set(tr))
  382.  
  383.     
  384.     def test_get_system_reports(self):
  385.         '''get_all_system_reports() and get_new_system_reports().'''
  386.         self.assertEqual(get_all_reports(), [])
  387.         self.assertEqual(get_all_system_reports(), [])
  388.  
  389.     
  390.     def test_delete_report(self):
  391.         '''delete_report().'''
  392.         tr = self._create_reports()
  393.         while tr:
  394.             self.assertEqual(set(get_all_reports()), set(tr))
  395.             delete_report(tr.pop())
  396.  
  397.     
  398.     def test_get_recent_crashes(self):
  399.         '''get_recent_crashes().'''
  400.         r = StringIO('ProblemType: Crash')
  401.         self.assertEqual(get_recent_crashes(r), 0)
  402.         r = StringIO('ProblemType: Crash\nDate: Wed Aug 01 00:00:01 1990')
  403.         self.assertEqual(get_recent_crashes(r), 0)
  404.         r = StringIO('ProblemType: Crash\nDate: Wed Aug 01 00:00:01 1990\nCrashCounter: 3')
  405.         self.assertEqual(get_recent_crashes(r), 0)
  406.         r = StringIO('ProblemType: Crash\nDate: %s\nCrashCounter: 3' % time.ctime(time.mktime(time.localtime()) - 90000))
  407.         self.assertEqual(get_recent_crashes(r), 0)
  408.         r = StringIO('ProblemType: Crash\nDate: %s\nCrashCounter: 3' % time.ctime(time.mktime(time.localtime()) - 3600))
  409.         self.assertEqual(get_recent_crashes(r), 3)
  410.  
  411.     
  412.     def test_make_report_path(self):
  413.         '''make_report_path().'''
  414.         pr = ProblemReport()
  415.         self.assertRaises(ValueError, make_report_path, pr)
  416.         pr['Package'] = 'bash 1'
  417.         self.assert_(make_report_path(pr).startswith('%s/bash' % report_dir))
  418.         pr['ExecutablePath'] = '/bin/bash'
  419.         self.assert_(make_report_path(pr).startswith('%s/_bin_bash' % report_dir))
  420.  
  421.     
  422.     def test_check_files_md5(self):
  423.         '''check_files_md5().'''
  424.         f1 = os.path.join(report_dir, 'test 1.txt')
  425.         f2 = os.path.join(report_dir, 'test:2.txt')
  426.         sumfile = os.path.join(report_dir, 'sums.txt')
  427.         open(f1, 'w').write('Some stuff')
  428.         open(f2, 'w').write('More stuff')
  429.         open(sumfile, 'w').write('2e41290da2fa3f68bd3313174467e3b5  %s\nf6423dfbc4faf022e58b4d3f5ff71a70  %s\n' % (f1[1:], f2))
  430.         self.assertEqual(check_files_md5(sumfile), [], 'correct md5sums')
  431.         open(f1, 'w').write('Some stuff!')
  432.         self.assertEqual(check_files_md5(sumfile), [
  433.             f1[1:]], 'file 1 wrong')
  434.         open(f2, 'w').write('More stuff!')
  435.         self.assertEqual(check_files_md5(sumfile), [
  436.             f1[1:],
  437.             f2], 'files 1 and 2 wrong')
  438.         open(f1, 'w').write('Some stuff')
  439.         self.assertEqual(check_files_md5(sumfile), [
  440.             f2], 'file 2 wrong')
  441.  
  442.     
  443.     def test_get_config(self):
  444.         '''get_config().'''
  445.         global _config_file, _config_file
  446.         _config_file = '/nonexisting'
  447.         self.assertEqual(get_config('main', 'foo'), None)
  448.         self.assertEqual(get_config('main', 'foo', 'moo'), 'moo')
  449.         get_config.config = None
  450.         f = tempfile.NamedTemporaryFile()
  451.         _config_file = f.name
  452.         self.assertEqual(get_config('main', 'foo'), None)
  453.         self.assertEqual(get_config('main', 'foo', 'moo'), 'moo')
  454.         get_config.config = None
  455.         f.write('[main]\none=1\ntwo = TWO\nb1 = 1\nb2=False\n[spethial]\none= 99\n')
  456.         f.flush()
  457.         self.assertEqual(get_config('main', 'foo'), None)
  458.         self.assertEqual(get_config('main', 'foo', 'moo'), 'moo')
  459.         self.assertEqual(get_config('main', 'one'), '1')
  460.         self.assertEqual(get_config('main', 'one', default = 'moo'), '1')
  461.         self.assertEqual(get_config('main', 'two'), 'TWO')
  462.         self.assertEqual(get_config('main', 'b1', bool = True), True)
  463.         self.assertEqual(get_config('main', 'b2', bool = True), False)
  464.         self.assertEqual(get_config('main', 'b3', bool = True), None)
  465.         self.assertEqual(get_config('main', 'b3', default = False, bool = True), False)
  466.         self.assertEqual(get_config('spethial', 'one'), '99')
  467.         self.assertEqual(get_config('spethial', 'two'), None)
  468.         self.assertEqual(get_config('spethial', 'one', 'moo'), '99')
  469.         self.assertEqual(get_config('spethial', 'nope', 'moo'), 'moo')
  470.         get_config.config = None
  471.         f.close()
  472.  
  473.  
  474. if __name__ == '__main__':
  475.     unittest.main()
  476.  
  477.